﻿using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Text;
using System.Windows.Forms;

namespace RabbitMQDemo
{
    public partial class RPC : Form
    {
        private readonly static RPC _RPC;
        Action<string, TextBox> SetText;
        static RPC()
        {
            _RPC = new RPC();
        }
        /// <summary>
        /// 单例模式
        /// </summary>
        public static RPC SingleForm { get { return _RPC; } }
        private RPC()
        {
            CheckForIllegalCrossThreadCalls = false;
            InitializeComponent();
        }

        private void btnSendMsg_Click(object sender, EventArgs e)
        {//RPC客户端发出请求
            string message = txtPublisher.Text;
            if (message.Trim().Length <= 0)
            {
                MessageBox.Show("请输入要发送的消息");
            }
            RpcClient client = new RpcClient();
            var response = client.Call(message);
            txtRpcClient.Text += string.Format("{0}\r\n", response);
            client.Close();
        }

        /// <summary>
        /// 客户端类
        /// </summary>
        private class RpcClient
        {
            #region 参数
            /// <summary>
            /// rabbitmq连接
            /// </summary>
            private readonly IConnection connection;
            /// <summary>
            /// 通道
            /// </summary>
            private readonly IModel channel;
            /// <summary>
            /// 客户端关联的队列
            /// </summary>
            private readonly string replyQueueName;
            /// <summary>
            /// 消费者
            /// </summary>
            private readonly EventingBasicConsumer consumer;
            //private readonly BlockingCollection<string> resQueue = new BlockingCollection<string>();

            private readonly BlockingCollection<string> resQueue = new BlockingCollection<string>();
            /// <summary>
            /// 消息属性
            /// </summary>
            private readonly IBasicProperties props;
            #endregion
            /// <summary>
            /// 构造函数
            /// </summary>
            public RpcClient()
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                connection = factory.CreateConnection();
                channel = connection.CreateModel();
                replyQueueName = channel.QueueDeclare().QueueName;
                consumer = new EventingBasicConsumer(channel);
                props = channel.CreateBasicProperties();
                //关联response,request和replyQueueName
                var correlationID = Guid.NewGuid().ToString();
                props.CorrelationId = correlationID;
                props.ReplyTo = replyQueueName;

                consumer.Received += (model, ea) =>
                  {
                      var response = Encoding.UTF8.GetString(ea.Body);
                      //确定返回的响应是这个请求发出的
                      if (ea.BasicProperties.CorrelationId == correlationID)
                          resQueue.Add(response);
                  };
            }


            public string Call(string msg)
            {
                var msgBytes = Encoding.UTF8.GetBytes(msg);
                channel.BasicPublish(exchange: "",
                    routingKey: "rpc_queue",
                    basicProperties: props,
                    body: msgBytes);

                channel.BasicConsume(
                    consumer: consumer,
                    queue: replyQueueName,
                    noAck: true);

                return resQueue.Take();
            }

            public void Close()
            {
                connection.Close();
            }
        }//class
    }
}
